/*
 * Copyright (2021) The Delta Lake Project Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.hadoop.fs.Path

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.Utils

class EvolvabilitySuite extends EvolvabilitySuiteBase with DeltaSQLCommandTest {

  import testImplicits._

  test("delta 0.1.0") {
    testEvolvability("src/test/resources/delta/delta-0.1.0")
  }

  test("delta 0.1.0 - case sensitivity enabled") {
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      testEvolvability("src/test/resources/delta/delta-0.1.0")
    }
  }

  test("serialized partition values must contain null values") {
    val tempDir = Utils.createTempDir().toString
    val df1 = spark.range(5).withColumn("part", lit(null).cast(StringType))
    val df2 = spark.range(5).withColumn("part", lit("1"))
    df1.union(df2).coalesce(1).write.partitionBy("part").format("delta").save(tempDir)

    // Clear the cache
    DeltaLog.clearCache()
    val deltaLog = DeltaLog.forTable(spark, tempDir)

    val dataThere = deltaLog.snapshot.allFiles.collect().forall { addFile =>
      if (!addFile.partitionValues.contains("part")) {
        fail(s"The partition values: ${addFile.partitionValues} didn't contain the column 'part'.")
      }
      val value = addFile.partitionValues("part")
      value === null || value === "1"
    }

    assert(dataThere, "Partition values didn't match with null or '1'")

    // Check serialized JSON as well
    val contents = deltaLog.store.read(
      FileNames.unsafeDeltaFile(deltaLog.logPath, 0L),
      deltaLog.newDeltaHadoopConf())
    assert(contents.exists(_.contains(""""part":null""")), "null value should be written in json")
  }

  testQuietly("parse old version LastCheckpointInfo") {
    assert(JsonUtils.mapper.readValue[LastCheckpointInfo]("""{"version":1,"size":1}""")
      === LastCheckpointInfo(1, 1, None, None, None, None))
  }

  test("parse partial version LastCheckpointInfo") {
    assert(JsonUtils.mapper.readValue[LastCheckpointInfo](
      """{"version":1,"size":1,"parts":100}""") ===
      LastCheckpointInfo(1, 1, Some(100), None, None, None))
  }

  // Following tests verify that operations on Delta table won't fail when there is an
  // unknown column in Delta files and checkpoints.
  // The modified Delta files and checkpoints with an extra column is generated by
  // `EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn()`

  test("transaction log schema evolvability - batch change data read") {
    withTempDir { dir =>
      withSQLConf(
        DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true",
        // All files verification will always fail in this test since we the extra column
        // will not be present in the `allFiles` of the CRC.
        DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "false",
        DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key ->
          "false"
      ) {
        EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn(spark, dir.getAbsolutePath)
        spark.sql(s"UPDATE delta.`${dir.getAbsolutePath}` SET value = 10")
        spark.read.format("delta").option("readChangeFeed", "true")
          .option("startingVersion", 0).load(dir.getAbsolutePath).collect()

        val expectedPreimage = (1 until 10).flatMap(x => Seq(x, x)).toSeq
        val expectedPostimage = Seq.fill(18)(10)
        testCdfUpdate(dir.getAbsolutePath, 6, expectedPreimage, expectedPostimage)
      }
    }
  }

  test("transaction log schema evolvability - streaming change data read") {
    withTempDir { dir =>
      withSQLConf(
        DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true",
        // All files verification will always fail in this test since we the extra column
        // will not be present in the `allFiles` of the CRC.
        DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "false",
        DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key ->
          "false"
      ) {
        EvolvabilitySuiteBase.generateTransactionLogWithExtraColumn(spark, dir.getAbsolutePath)
        spark.sql(s"UPDATE delta.`${dir.getAbsolutePath}` SET value = 10")
        val query = spark.readStream.format("delta")
          .option("readChangeFeed", "true")
          .option("startingVersion", 0)
          .load(dir.getAbsolutePath)
          .writeStream.format("noop").start()
        try {
          query.processAllAvailable()
        } finally {
          query.stop()
        }

        val expectedPreimage = (1 until 10).flatMap(x => Seq(x, x)).toSeq
        val expectedPostimage = Seq.fill(18)(10)
        testCdfUpdate(dir.getAbsolutePath, 6, expectedPreimage, expectedPostimage, true)
      }
    }
  }

  test("transaction log schema evolvability - batch read") {
    testLogSchemaEvolvability(
      (path: String) => { spark.read.format("delta").load(path).collect() }
    )
  }

  test("transaction log schema evolvability - batch write") {
    testLogSchemaEvolvability(
      (path: String) => {
        (10 until 20).map(num => (num, num)).toDF("key", "value")
          .write.format("delta").mode("append").save(path)
        spark.read.format("delta").load(path).collect()
      }
    )
  }

  test("transaction log schema evolvability - streaming read") {
    testLogSchemaEvolvability(
      (path: String) => {
        val query = spark.readStream.format("delta").load(path).writeStream.format("noop").start()
        try {
          query.processAllAvailable()
        } finally {
          query.stop()
        }
      }
    )
  }

  test("transaction log schema evolvability - streaming write") {
    testLogSchemaEvolvability(
      (path: String) => {
        withTempDir { tempDir =>
          val memStream = MemoryStream[(Int, Int)]
          memStream.addData((11, 11), (12, 12))
          val stream = memStream.toDS().toDF("key", "value")
            .coalesce(1).writeStream
            .format("delta")
            .trigger(Trigger.Once)
            .outputMode("append")
            .option("checkpointLocation", tempDir.getCanonicalPath + "/cp")
            .start(path)
          try {
            stream.processAllAvailable()
          } finally {
            stream.stop()
          }
        }
      }
    )
  }

  test("transaction log schema evolvability - describe commands") {
    testLogSchemaEvolvability(
      (path: String) => {
        spark.sql(s"DESCRIBE delta.`$path`")
        spark.sql(s"DESCRIBE HISTORY delta.`$path`")
        spark.sql(s"DESCRIBE DETAIL delta.`$path`")
      }
    )
  }

  test("transaction log schema evolvability - vacuum") {
    testLogSchemaEvolvability(
      (path: String) => {
        sql(s"VACUUM delta.`$path`")
      }
    )
  }

  test("transaction log schema evolvability - alter table") {
    testLogSchemaEvolvability(
      (path: String) => {
        sql(s"ALTER TABLE delta.`$path` ADD COLUMNS (col int)")
      }
    )
  }

  test("transaction log schema evolvability - delete") {
    testLogSchemaEvolvability(
      (path: String) => { sql(s"DELETE FROM delta.`$path` WHERE key = 1") }
    )
  }

  test("transaction log schema evolvability - update") {
    testLogSchemaEvolvability(
      (path: String) => { sql(s"UPDATE delta.`$path` set value = 100 WHERE key = 1") }
    )
  }

  test("transaction log schema evolvability - merge") {
    testLogSchemaEvolvability(
      (path: String) => {
        withTable("source") {
          Seq((1, 5), (11, 12))
            .toDF("key", "value")
            .write
            .mode("overwrite")
            .format("delta")
            .saveAsTable("source")
          sql(
            s"""
               |MERGE INTO delta.`$path` tgrt
               |USING source src
               |ON src.key = tgrt.key
               |WHEN MATCHED THEN
               |  UPDATE SET key = 20 + src.key, value = 20 + src.value
               |WHEN NOT MATCHED THEN
               |  INSERT (key, value) VALUES (src.key + 5, src.value + 10)
           """.stripMargin
          )
        }
      }
    )
  }

  test("Delta Lake issue 1229: able to read a checkpoint containing `numRecords`") {
    // table created using Delta 1.2.1 which has additional field `numRecords` in
    // checkpoint schema. It is removed in version after 1.2.1.
    // Make sure we are able to read the Delta table in the latest version.
    val tablePath = "src/test/resources/delta/delta-1.2.1"
    assert(
      spark.read.format("delta")
        .load(tablePath).where("col1 = 8").count() === 9L)
  }
}
